{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 103 - Simplifying Machine Learning Pipelines with `mmlspark`\n",
    "\n",
    "### 1. Introduction\n",
    "\n",
    "<p><img src=\"https://images-na.ssl-images-amazon.com/images/G/01/img16/books/bookstore",
    "/landing-page/1000638_books_landing-page_bookstore-photo-01.jpg\"",
    " style=\"width: 500px;\"",
    " title=\"Image from https://images-na.ssl-images-amazon.com/images/G/01/img16/books/bookstore",
    "/landing-page/1000638_books_landing-page_bookstore-photo-01.jpg\" /><br /></p>\n",
    "\n",
    "In this tutorial, we perform the same classification task in two\n",
    "different ways: once using plain **`pyspark`** and once using the\n",
    "**`mmlspark`** library.  The two methods yield the same performance,\n",
    "but one of the two libraries is drastically simpler to use and iterate\n",
    "on (can you guess which one?).\n",
    "\n",
    "The task is simple: Predict whether a user's review of a book sold on\n",
    "Amazon is good (rating > 3) or bad based on the text of the review.  We\n",
    "accomplish this by training LogisticRegression learners with different\n",
    "hyperparameters and choosing the best model."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 2. Read the data\n",
    "\n",
    "We download and read in the data. We show a sample below:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "import mmlspark\n",
    "from pyspark.sql.types import IntegerType, StringType, StructType, StructField\n",
    "\n",
    "dataFile = \"BookReviewsFromAmazon10K.tsv\"\n",
    "textSchema = StructType([StructField(\"rating\", IntegerType(), False),\n",
    "                         StructField(\"text\", StringType(), False)])\n",
    "import os, urllib\n",
    "if not os.path.isfile(dataFile):\n",
    "    urllib.request.urlretrieve(\"https://mmlspark.azureedge.net/datasets/\"+dataFile, dataFile)\n",
    "raw_data = spark.createDataFrame(pd.read_csv(dataFile, sep=\"\\t\", header=None), textSchema)\n",
    "raw_data.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 3. Extract more features and process data\n",
    "\n",
    "Real data however is more complex than the above dataset. It is common\n",
    "for a dataset to have features of multiple types: text, numeric,\n",
    "categorical.  To illustrate how difficult it is to work with these\n",
    "datasets, we add two numerical features to the dataset: the **word\n",
    "count** of the review and the **mean word length**."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import udf\n",
    "from pyspark.sql.types import LongType, FloatType, DoubleType\n",
    "def word_count(s):\n",
    "    return len(s.split())\n",
    "def word_length(s):\n",
    "    import numpy as np\n",
    "    ss = [len(w) for w in s.split()]\n",
    "    return round(float(np.mean(ss)), 2)\n",
    "word_length_udf = udf(word_length, DoubleType())\n",
    "word_count_udf = udf(word_count, IntegerType())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "data = raw_data \\\n",
    "       .select(\"rating\", \"text\",\n",
    "               word_count_udf(\"text\").alias(\"wordCount\"),\n",
    "               word_length_udf(\"text\").alias(\"wordLength\")) \\\n",
    "       .withColumn(\"label\", raw_data[\"rating\"] > 3).drop(\"rating\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "data.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 4a. Classify using pyspark\n",
    "\n",
    "To choose the best LogisticRegression classifier using the `pyspark`\n",
    "library, need to *explictly* perform the following steps:\n",
    "\n",
    "1. Process the features:\n",
    "   * Tokenize the text column\n",
    "   * Hash the tokenized column into a vector using hashing\n",
    "   * Merge the numeric features with the vector in the step above\n",
    "2. Process the label column: cast it into the proper type.\n",
    "3. Train multiple LogisticRegression algorithms on the `train` dataset\n",
    "   with different hyperparameters\n",
    "4. Compute the area under the ROC curve for each of the trained models\n",
    "   and select the model with the highest metric as computed on the\n",
    "   `test` dataset\n",
    "5. Evaluate the best model on the `validation` set\n",
    "\n",
    "As you can see below, there is a lot of work involved and a lot of\n",
    "steps where something can go wrong!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import Tokenizer, HashingTF\n",
    "from pyspark.ml.feature import VectorAssembler\n",
    "\n",
    "# Featurize text column\n",
    "tokenizer = Tokenizer(inputCol=\"text\", outputCol=\"tokenizedText\")\n",
    "numFeatures = 10000\n",
    "hashingScheme = HashingTF(inputCol=\"tokenizedText\",\n",
    "                          outputCol=\"TextFeatures\",\n",
    "                          numFeatures=numFeatures)\n",
    "tokenizedData = tokenizer.transform(data)\n",
    "featurizedData = hashingScheme.transform(tokenizedData)\n",
    "\n",
    "# Merge text and numeric features in one feature column\n",
    "feature_columns_array = [\"TextFeatures\", \"wordCount\", \"wordLength\"]\n",
    "assembler = VectorAssembler(\n",
    "    inputCols = feature_columns_array,\n",
    "    outputCol=\"features\")\n",
    "assembledData = assembler.transform(featurizedData)\n",
    "\n",
    "# Select only columns of interest\n",
    "# Convert rating column from boolean to int\n",
    "processedData = assembledData \\\n",
    "                .select(\"label\", \"features\") \\\n",
    "                .withColumn(\"label\", assembledData.label.cast(IntegerType()))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.evaluation import BinaryClassificationEvaluator\n",
    "from pyspark.ml.classification import LogisticRegression\n",
    "\n",
    "# Prepare data for learning\n",
    "train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)\n",
    "\n",
    "# Train the models on the 'train' data\n",
    "lrHyperParams = [0.05, 0.1, 0.2, 0.4]\n",
    "logisticRegressions = [LogisticRegression(regParam = hyperParam)\n",
    "                       for hyperParam in lrHyperParams]\n",
    "evaluator = BinaryClassificationEvaluator(rawPredictionCol=\"rawPrediction\",\n",
    "                                          metricName=\"areaUnderROC\")\n",
    "metrics = []\n",
    "models = []\n",
    "\n",
    "# Select the best model\n",
    "for learner in logisticRegressions:\n",
    "    model = learner.fit(train)\n",
    "    models.append(model)\n",
    "    scored_data = model.transform(test)\n",
    "    metrics.append(evaluator.evaluate(scored_data))\n",
    "best_metric = max(metrics)\n",
    "best_model = models[metrics.index(best_metric)]\n",
    "\n",
    "# Save model\n",
    "best_model.write().overwrite().save(\"SparkMLExperiment.mmls\")\n",
    "# Get AUC on the validation dataset\n",
    "scored_val = best_model.transform(validation)\n",
    "print(evaluator.evaluate(scored_val))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 4b. Classify using mmlspark\n",
    "\n",
    "Life is a lot simpler when using `mmlspark`!\n",
    "\n",
    "1. The **`TrainClassifier`** Estimator featurizes the data internally,\n",
    "   as long as the columns selected in the `train`, `test`, `validation`\n",
    "   dataset represent the features\n",
    "\n",
    "2. The **`FindBestModel`** Estimator find the best model from a pool of\n",
    "   trained models by find the model which performs best on the `test`\n",
    "   dataset given the specified metric\n",
    "\n",
    "3. The **`CompueModelStatistics`** Transformer computes the different\n",
    "   metrics on a scored dataset (in our case, the `validation` dataset)\n",
    "   at the same time"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from mmlspark import TrainClassifier, FindBestModel, ComputeModelStatistics\n",
    "\n",
    "# Prepare data for learning\n",
    "train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)\n",
    "\n",
    "# Train the models on the 'train' data\n",
    "lrHyperParams = [0.05, 0.1, 0.2, 0.4]\n",
    "logisticRegressions = [LogisticRegression(regParam = hyperParam)\n",
    "                       for hyperParam in lrHyperParams]\n",
    "lrmodels = [TrainClassifier(model=lrm, labelCol=\"label\", numFeatures=10000).fit(train)\n",
    "            for lrm in logisticRegressions]\n",
    "\n",
    "# Select the best model\n",
    "bestModel = FindBestModel(evaluationMetric=\"AUC\", models=lrmodels).fit(test)\n",
    "\n",
    "# Save model\n",
    "bestModel.write().overwrite().save(\"MMLSExperiment.mmls\")\n",
    "# Get AUC on the validation dataset\n",
    "predictions = bestModel.transform(validation)\n",
    "metrics = ComputeModelStatistics().transform(predictions)\n",
    "print(\"Best model's AUC on validation set = \"\n",
    "      + \"{0:.2f}%\".format(metrics.first()[\"AUC\"] * 100))"
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python [default]",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
